Skip to content

Conversation

@olix0r
Copy link
Member

@olix0r olix0r commented Nov 6, 2025

When unsubscribing a stream from a federated service, we:

  1. For each cluster, send an update on the stream's channel to remove endpoints.
  2. Call synchronizedGetStream.Stop() so that it stops processing updates.

These steps can race and deadlock and task leak: if Stop() has been called before endoint updates are processed, the subsequent Send() calls block forever.

This change improves test coverage to cover this case as well as that fixed in f4e6795.

…lock indefinitely

When unsubscribing a stream from a federated service, we:

1. For each cluster, send an update on the stream's channel to remove endpoints.
2. Call synchronizedGetStream.Stop() so that it stops processing updates.

These steps can race and deadlock: if Stop() has been called before endoint
updates are processed, the subsequent Send() calls block forever.

This change improves test coverage to cover this case as well as that fixed in
f4e6795.
@olix0r olix0r requested a review from a team as a code owner November 6, 2025 18:40
@olix0r olix0r changed the title fix(destination): ensure that synchronizedGetStream.Send() does not b… fix(destination): ensure that synchronizedGetStream.Send() does not block on close Nov 6, 2025
@olix0r olix0r changed the title fix(destination): ensure that synchronizedGetStream.Send() does not block on close fix(destination): prevent task leak in federated service watchers Nov 6, 2025
@olix0r olix0r requested a review from Copilot November 6, 2025 18:46
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds robust error handling to synchronizedGetStream.Send() to prevent goroutine leaks when the stream is stopped, along with comprehensive test coverage for the new behavior.

  • Modified Send() method to return errStreamStopped error when called after Stop(), preventing indefinite blocking
  • Added two test cases verifying that Send() returns promptly after Stop() is called, even when the inner stream is blocked

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
controller/api/destination/syncronized_get_stream.go Updated Send() method with graceful shutdown logic using select statements to check if stream is stopped before and during channel send operations
controller/api/destination/syncronized_get_stream_test.go Added comprehensive test coverage with two test functions and a blocking mock server to verify Send behavior after Stop is called

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Member

@zaharidichev zaharidichev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Solid test coverage

@olix0r olix0r enabled auto-merge (squash) November 6, 2025 18:58
@olix0r olix0r merged commit 8dca484 into main Nov 6, 2025
39 checks passed
@olix0r olix0r deleted the ver/sync-get-fix branch November 6, 2025 19:17
Copy link
Member

@adleong adleong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants